#include <string.h>
#include <sys/types.h>
#include <sys/stat.h>
+#include <stdarg.h>
#include "blockstore.h"
#define BLOCKSTORE_REMOTE
+//#define BSDEBUG
-#ifdef BLOCKSTORE_REMOTE
+/*****************************************************************************
+ * Debugging
+ */
+#ifdef BSDEBUG
+void DB(char *format, ...)
+{
+ va_list args;
+
+ va_start(args, format);
+ vfprintf(stderr, format, args);
+ va_end(args);
+}
+#else
+#define DB(format, ...) (void)0
+#endif
-//#define BSDEBUG
+#ifdef BLOCKSTORE_REMOTE
#include <sys/socket.h>
#include <sys/ioctl.h>
#include <netinet/in.h>
#include <netdb.h>
-#define ENTER_QUEUE_CR (void)0
-#define LEAVE_QUEUE_CR (void)0
+/*****************************************************************************
+ * *
+ *****************************************************************************/
+
+/*****************************************************************************
+ * Network state *
+ *****************************************************************************/
+/* The individual disk servers we talks to. These will be referenced by
+ * an integer index into bsservers[].
+ */
bsserver_t bsservers[MAX_SERVERS];
+
+/* The cluster map. This is indexed by an integer cluster number.
+ */
bscluster_t bsclusters[MAX_CLUSTERS];
+/* Local socket.
+ */
struct sockaddr_in sin_local;
int bssock = 0;
+/*****************************************************************************
+ * Message queue management *
+ *****************************************************************************/
+
+/* Protects the queue manipulation critcal regions.
+ */
+#define ENTER_QUEUE_CR (void)0
+#define LEAVE_QUEUE_CR (void)0
+
+/* A message queue entry. We allocate one of these for every request we send.
+ * Asynchronous reply reception also used one of these.
+ */
typedef struct bsq_t_struct {
struct bsq_t_struct *prev;
struct bsq_t_struct *next;
+ int status;
int server;
int length;
struct msghdr msghdr;
void *block;
} bsq_t;
+#define BSQ_STATUS_MATCHED 1
+
+#define ENTER_LUID_CR (void)0
+#define LEAVE_LUID_CR (void)0
+
+static u64 luid_cnt = 0x1000ULL;
+u64 new_luid(void) {
+ u64 luid;
+ ENTER_LUID_CR;
+ luid = luid_cnt++;
+ LEAVE_LUID_CR;
+ return luid;
+}
+
+/* Queue of outstanding requests.
+ */
bsq_t *bs_head = NULL;
bsq_t *bs_tail = NULL;
+int bs_qlen = 0;
+
+/*
+ */
+void queuedebug(char *msg) {
+ bsq_t *q;
+ ENTER_QUEUE_CR;
+ fprintf(stderr, "Q: %s len=%u\n", msg, bs_qlen);
+ for (q = bs_head; q; q = q->next) {
+ fprintf(stderr, " luid=%016llx server=%u\n",
+ q->message.luid, q->server);
+ }
+ LEAVE_QUEUE_CR;
+}
+
+int enqueue(bsq_t *qe) {
+ ENTER_QUEUE_CR;
+ qe->next = NULL;
+ qe->prev = bs_tail;
+ if (!bs_head)
+ bs_head = qe;
+ else
+ bs_tail->next = qe;
+ bs_tail = qe;
+ bs_qlen++;
+ LEAVE_QUEUE_CR;
+#ifdef BSDEBUG
+ queuedebug("enqueue");
+#endif
+ return 0;
+}
+
+int dequeue(bsq_t *qe) {
+ bsq_t *q;
+ ENTER_QUEUE_CR;
+ for (q = bs_head; q; q = q->next) {
+ if (q == qe) {
+ if (q->prev)
+ q->prev->next = q->next;
+ else
+ bs_head = q->next;
+ if (q->next)
+ q->next->prev = q->prev;
+ else
+ bs_tail = q->prev;
+ bs_qlen--;
+ goto found;
+ }
+ }
+
+ LEAVE_QUEUE_CR;
+#ifdef BSDEBUG
+ queuedebug("dequeue not found");
+#endif
+ return 0;
+
+ found:
+ LEAVE_QUEUE_CR;
+#ifdef BSDEBUG
+ queuedebug("dequeue not found");
+#endif
+ return 1;
+}
+
+bsq_t *queuesearch(bsq_t *qe) {
+ bsq_t *q;
+ ENTER_QUEUE_CR;
+ for (q = bs_head; q; q = q->next) {
+ if ((qe->server == q->server) &&
+ (qe->message.operation == q->message.operation) &&
+ (qe->message.luid == q->message.luid)) {
+
+ if ((q->message.operation == BSOP_READBLOCK) &&
+ ((q->message.flags & BSOP_FLAG_ERROR) == 0)) {
+ q->block = qe->block;
+ qe->block = NULL;
+ }
+ q->length = qe->length;
+ q->message.flags = qe->message.flags;
+ q->message.id = qe->message.id;
+ q->status |= BSQ_STATUS_MATCHED;
+
+ if (q->prev)
+ q->prev->next = q->next;
+ else
+ bs_head = q->next;
+ if (q->next)
+ q->next->prev = q->prev;
+ else
+ bs_tail = q->prev;
+ q->next = NULL;
+ q->prev = NULL;
+ bs_qlen--;
+ goto found;
+ }
+ }
+
+ LEAVE_QUEUE_CR;
+#ifdef BSDEBUG
+ queuedebug("queuesearch not found");
+#endif
+ return NULL;
+
+ found:
+ LEAVE_QUEUE_CR;
+#ifdef BSDEBUG
+ queuedebug("queuesearch found");
+#endif
+ return q;
+}
int send_message(bsq_t *qe) {
int rc;
qe->iov[1].iov_len = BLOCK_SIZE;
}
- rc = sendmsg(bssock, &(qe->msghdr), 0);
+ qe->message.luid = new_luid();
+
+ qe->status = 0;
+ if (enqueue(qe) < 0) {
+ fprintf(stderr, "Error enqueuing request.\n");
+ return -1;
+ }
+
+ DB("send_message to %d luid=%016llx\n", qe->server, qe->message.luid);
+ rc = sendmsg(bssock, &(qe->msghdr), MSG_DONTWAIT);
//rc = sendto(bssock, (void *)&(qe->message), qe->length, 0,
// (struct sockaddr *)&(bsservers[qe->server].sin),
// sizeof(struct sockaddr_in));
if (rc < 0)
return rc;
-
- ENTER_QUEUE_CR;
-
- LEAVE_QUEUE_CR;
return rc;
}
return rc;
}
+int get_server_number(struct sockaddr_in *sin) {
+ int i;
+
+#ifdef BSDEBUG2
+ fprintf(stderr,
+ "get_server_number(%u.%u.%u.%u/%u)\n",
+ (unsigned int)sin->sin_addr.s_addr & 0xff,
+ ((unsigned int)sin->sin_addr.s_addr >> 8) & 0xff,
+ ((unsigned int)sin->sin_addr.s_addr >> 16) & 0xff,
+ ((unsigned int)sin->sin_addr.s_addr >> 24) & 0xff,
+ (unsigned int)sin->sin_port);
+#endif
+
+ for (i = 0; i < MAX_SERVERS; i++) {
+ if (bsservers[i].hostname) {
+#ifdef BSDEBUG2
+ fprintf(stderr,
+ "get_server_number check %u.%u.%u.%u/%u\n",
+ (unsigned int)bsservers[i].sin.sin_addr.s_addr&0xff,
+ ((unsigned int)bsservers[i].sin.sin_addr.s_addr >> 8)&0xff,
+ ((unsigned int)bsservers[i].sin.sin_addr.s_addr >> 16)&0xff,
+ ((unsigned int)bsservers[i].sin.sin_addr.s_addr >> 24)&0xff,
+ (unsigned int)bsservers[i].sin.sin_port);
+#endif
+ if ((sin->sin_family == bsservers[i].sin.sin_family) &&
+ (sin->sin_port == bsservers[i].sin.sin_port) &&
+ (memcmp((void *)&(sin->sin_addr),
+ (void *)&(bsservers[i].sin.sin_addr),
+ sizeof(struct in_addr)) == 0)) {
+ return i;
+ }
+ }
+ }
+
+ return -1;
+}
+
+void *rx_buffer = NULL;
+bsq_t rx_qe;
+bsq_t *recv_any(void) {
+ struct sockaddr_in from;
+ int rc;
+
+ DB("ENTER recv_any\n");
+
+ rx_qe.msghdr.msg_name = &from;
+ rx_qe.msghdr.msg_namelen = sizeof(struct sockaddr_in);
+ rx_qe.msghdr.msg_iov = rx_qe.iov;
+ if (!rx_buffer) {
+ rx_buffer = malloc(BLOCK_SIZE);
+ if (!rx_buffer) {
+ perror("recv_any malloc");
+ return NULL;
+ }
+ }
+ rx_qe.block = rx_buffer;
+ rx_buffer = NULL;
+ rx_qe.msghdr.msg_iovlen = 2;
+ rx_qe.msghdr.msg_control = NULL;
+ rx_qe.msghdr.msg_controllen = 0;
+ rx_qe.msghdr.msg_flags = 0;
+
+ rx_qe.iov[0].iov_base = (void *)&(rx_qe.message);
+ rx_qe.iov[0].iov_len = MSGBUFSIZE_ID;
+ rx_qe.iov[1].iov_base = rx_qe.block;
+ rx_qe.iov[1].iov_len = BLOCK_SIZE;
+
+ rc = recvmsg(bssock, &(rx_qe.msghdr), 0);
+ if (rc < 0) {
+ perror("recv_any");
+ return NULL;
+ }
+ rx_qe.length = rc;
+ rx_qe.server = get_server_number(&from);
+
+ DB("recv_any from %d luid=%016llx len=%u\n",
+ rx_qe.server, rx_qe.message.luid, rx_qe.length);
+
+ return &rx_qe;
+}
+
+void recv_recycle_buffer(bsq_t *q) {
+ if (q->block) {
+ rx_buffer = q->block;
+ q->block = NULL;
+ }
+}
+
+// cycle through reading any incoming, searching for a match in the
+// queue, until we have all we need.
+int wait_recv(bsq_t **reqs, int numreqs) {
+ bsq_t *q, *m;
+ unsigned int x, i;
+
+ DB("ENTER wait_recv %u\n", numreqs);
+
+ checkmatch:
+ x = 0xffffffff;
+ for (i = 0; i < numreqs; i++) {
+ x &= reqs[i]->status;
+ }
+ if ((x & BSQ_STATUS_MATCHED)) {
+ DB("LEAVE wait_recv\n");
+ return numreqs;
+ }
+
+ rxagain:
+ q = recv_any();
+ if (!q)
+ return -1;
+
+ m = queuesearch(q);
+ recv_recycle_buffer(q);
+ if (!m) {
+ fprintf(stderr, "Unmatched RX\n");
+ goto rxagain;
+ }
+
+ goto checkmatch;
+
+}
+
void *readblock_indiv(int server, u64 id) {
void *block;
bsq_t *qe;
- int len;
+ int len, rc;
qe = (bsq_t *)malloc(sizeof(bsq_t));
if (!qe) {
perror("readblock qe malloc");
return NULL;
}
+ qe->block = NULL;
+
+ /*
qe->block = malloc(BLOCK_SIZE);
if (!qe->block) {
perror("readblock qe malloc");
free((void *)qe);
return NULL;
}
+ */
qe->server = server;
goto err;
}
- len = recv_message(qe);
+ /*len = recv_message(qe);
if (len < 0) {
perror("readblock recv");
goto err;
+ }*/
+
+ rc = wait_recv(&qe, 1);
+ if (rc < 0) {
+ perror("readblock recv");
+ goto err;
}
+
if ((qe->message.flags & BSOP_FLAG_ERROR)) {
fprintf(stderr, "readblock server error\n");
goto err;
}
- if (len < MSGBUFSIZE_BLOCK) {
+ if (qe->length < MSGBUFSIZE_BLOCK) {
fprintf(stderr, "readblock recv short (%u)\n", len);
goto err;
}
- if ((block = malloc(BLOCK_SIZE)) == NULL) {
+ /* if ((block = malloc(BLOCK_SIZE)) == NULL) {
perror("readblock malloc");
goto err;
}
- //memcpy(block, qe->message.block, BLOCK_SIZE);
+ memcpy(block, qe->message.block, BLOCK_SIZE);
+ */
block = qe->block;
free((void *)qe);
return block;
err:
- free(qe->block);
+ if (qe->block)
+ free(qe->block);
free((void *)qe);
return NULL;
}
return block;
}
-int writeblock_indiv(int server, u64 id, void *block) {
+bsq_t *writeblock_indiv(int server, u64 id, void *block) {
+
bsq_t *qe;
int len;
perror("writeblock sendto");
goto err;
}
-
- len = recv_message(qe);
- if (len < 0) {
- perror("writeblock recv");
- goto err;
- }
- if ((qe->message.flags & BSOP_FLAG_ERROR)) {
- fprintf(stderr, "writeblock server error\n");
- goto err;
- }
- if (len < MSGBUFSIZE_ID) {
- fprintf(stderr, "writeblock recv short (%u)\n", len);
- goto err;
- }
- free((void *)qe);
- return 0;
+ return qe;
err:
free((void *)qe);
- return -1;
+ return NULL;
}
+
/**
* writeblock: write an existing block to disk
* @return: zero on success, -1 on failure
*/
int writeblock(u64 id, void *block) {
- int map = (int)BSID_MAP(id);
+ int map = (int)BSID_MAP(id);
int rep0 = bsclusters[map].servers[0];
int rep1 = bsclusters[map].servers[1];
int rep2 = bsclusters[map].servers[2];
+ bsq_t *reqs[3];
+ int rc;
+
+ reqs[0] = reqs[1] = reqs[2] = NULL;
#ifdef BSDEBUG
fprintf(stderr,
(unsigned int)((unsigned char *)block)[7]);
#endif
-/* special case for the "superblock" just use the first block on the
+ /* special case for the "superblock" just use the first block on the
* first replica. (extend to blocks < 6 for vdi bug)
*/
if (id < 6) {
- return writeblock_indiv(rep0, id, block);
+ reqs[0] = writeblock_indiv(rep0, id, block);
+ if (!reqs[0])
+ return -1;
+ rc = wait_recv(reqs, 1);
+ return rc;
}
- if (writeblock_indiv(rep0, BSID_REPLICA0(id), block) < 0)
- return -1;
- if (writeblock_indiv(rep1, BSID_REPLICA1(id), block) < 0)
- return -1;
- if (writeblock_indiv(rep2, BSID_REPLICA2(id), block) < 0)
- return -1;
+ reqs[0] = writeblock_indiv(rep0, BSID_REPLICA0(id), block);
+ if (!reqs[0])
+ goto err;
+ reqs[1] = writeblock_indiv(rep1, BSID_REPLICA1(id), block);
+ if (!reqs[1])
+ goto err;
+ reqs[2] = writeblock_indiv(rep2, BSID_REPLICA2(id), block);
+ if (!reqs[2])
+ goto err;
+
+ rc = wait_recv(reqs, 3);
+ if (rc < 0) {
+ perror("writeblock recv");
+ goto err;
+ }
+ if ((reqs[0]->message.flags & BSOP_FLAG_ERROR)) {
+ fprintf(stderr, "writeblock server0 error\n");
+ goto err;
+ }
+ if ((reqs[1]->message.flags & BSOP_FLAG_ERROR)) {
+ fprintf(stderr, "writeblock server1 error\n");
+ goto err;
+ }
+ if ((reqs[2]->message.flags & BSOP_FLAG_ERROR)) {
+ fprintf(stderr, "writeblock server2 error\n");
+ goto err;
+ }
+
+
+ free((void *)reqs[0]);
+ free((void *)reqs[1]);
+ free((void *)reqs[2]);
return 0;
+
+ err:
+ if (reqs[0]) {
+ dequeue(reqs[0]);
+ free((void *)reqs[0]);
+ }
+ if (reqs[1]) {
+ dequeue(reqs[1]);
+ free((void *)reqs[1]);
+ }
+ if (reqs[2]) {
+ dequeue(reqs[2]);
+ free((void *)reqs[2]);
+ }
+ return -1;
}
/**
return allocblock_hint(block, 0);
}
-u64 allocblock_hint_indiv(int server, void *block, u64 hint) {
+bsq_t *allocblock_hint_indiv(int server, void *block, u64 hint) {
bsq_t *qe;
int len;
goto err;
}
- len = recv_message(qe);
- if (len < 0) {
- perror("allocblock_hint recv");
- goto err;
- }
- if ((qe->message.flags & BSOP_FLAG_ERROR)) {
- fprintf(stderr, "allocblock_hint server error\n");
- goto err;
- }
- if (len < MSGBUFSIZE_ID) {
- fprintf(stderr, "allocblock_hint recv short (%u)\n", len);
- goto err;
- }
-
- free((void *)qe);
- return qe->message.id;
+ return qe;
err:
free((void *)qe);
- return 0;
+ return NULL;
}
/**
*/
u64 allocblock_hint(void *block, u64 hint) {
int map = (int)hint;
-
int rep0 = bsclusters[map].servers[0];
int rep1 = bsclusters[map].servers[1];
int rep2 = bsclusters[map].servers[2];
-
+ bsq_t *reqs[3];
+ int rc;
u64 id0, id1, id2;
- id0 = allocblock_hint_indiv(rep0, block, 0);
- if (id0 == 0)
- return 0;
- id1 = allocblock_hint_indiv(rep1, block, 0);
- if (id1 == 0)
- return 0;
- id2 = allocblock_hint_indiv(rep2, block, 0);
- if (id2 == 0)
- return 0;
+ reqs[0] = reqs[1] = reqs[2] = NULL;
+
+ DB("ENTER allocblock\n");
+
+ reqs[0] = allocblock_hint_indiv(rep0, block, hint);
+ if (!reqs[0])
+ goto err;
+ reqs[1] = allocblock_hint_indiv(rep1, block, hint);
+ if (!reqs[1])
+ goto err;
+ reqs[2] = allocblock_hint_indiv(rep2, block, hint);
+ if (!reqs[2])
+ goto err;
+
+ rc = wait_recv(reqs, 3);
+ if (rc < 0) {
+ perror("allocblock recv");
+ goto err;
+ }
+ if ((reqs[0]->message.flags & BSOP_FLAG_ERROR)) {
+ fprintf(stderr, "allocblock server0 error\n");
+ goto err;
+ }
+ if ((reqs[1]->message.flags & BSOP_FLAG_ERROR)) {
+ fprintf(stderr, "allocblock server1 error\n");
+ goto err;
+ }
+ if ((reqs[2]->message.flags & BSOP_FLAG_ERROR)) {
+ fprintf(stderr, "allocblock server2 error\n");
+ goto err;
+ }
+
+ id0 = reqs[0]->message.id;
+ id1 = reqs[1]->message.id;
+ id2 = reqs[2]->message.id;
#ifdef BSDEBUG
fprintf(stderr, "ALLOC: %016llx %02x%02x %02x%02x %02x%02x %02x%02x\n",
(unsigned int)((unsigned char *)block)[6],
(unsigned int)((unsigned char *)block)[7]);
#endif
-
+
+ free((void *)reqs[0]);
+ free((void *)reqs[1]);
+ free((void *)reqs[2]);
return BSID(map, id0, id1, id2);
+
+ err:
+ if (reqs[0]) {
+ dequeue(reqs[0]);
+ free((void *)reqs[0]);
+ }
+ if (reqs[1]) {
+ dequeue(reqs[1]);
+ free((void *)reqs[1]);
+ }
+ if (reqs[2]) {
+ dequeue(reqs[2]);
+ free((void *)reqs[2]);
+ }
+ return 0;
}
#else /* /BLOCKSTORE_REMOTE */
int i;
bsservers[0].hostname = "firebug.cl.cam.ac.uk";
- bsservers[1].hostname = "tetris.cl.cam.ac.uk";
- bsservers[2].hostname = "donkeykong.cl.cam.ac.uk";
- bsservers[3].hostname = "gunfighter.cl.cam.ac.uk";
- bsservers[4].hostname = "galaxian.cl.cam.ac.uk";
- bsservers[5].hostname = "firetrack.cl.cam.ac.uk";
- bsservers[6].hostname = "funfair.cl.cam.ac.uk";
- bsservers[7].hostname = "felix.cl.cam.ac.uk";
+ bsservers[1].hostname = "planb.cl.cam.ac.uk";
+ bsservers[2].hostname = "simcity.cl.cam.ac.uk";
+ bsservers[3].hostname = NULL/*"gunfighter.cl.cam.ac.uk"*/;
+ bsservers[4].hostname = NULL/*"galaxian.cl.cam.ac.uk"*/;
+ bsservers[5].hostname = NULL/*"firetrack.cl.cam.ac.uk"*/;
+ bsservers[6].hostname = NULL/*"funfair.cl.cam.ac.uk"*/;
+ bsservers[7].hostname = NULL/*"felix.cl.cam.ac.uk"*/;
bsservers[8].hostname = NULL;
bsservers[9].hostname = NULL;
bsservers[10].hostname = NULL;